package gu.simplemq;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.base.Strings;
import gu.simplemq.pool.BaseMQPool;
/**
 * 消息分发器抽象实现<br>
 * @author guyadong
 *
 */
public abstract class BaseMQDispatcher<C> extends ChannelDispatcher implements AutoCloseable,IConsumer {
	protected final BaseMQPool<C> pool;
	private volatile C connection;
	private final AtomicBoolean closed = new AtomicBoolean(Boolean.FALSE);
	protected BaseMQDispatcher(BaseMQPool<C> pool) {
		super();
		this.pool = checkNotNull(pool,"pool is null");
	}
	protected C getConnection() {
		return checkNotNull(connection,"connection is uninitialized");
	}
	protected void doInit() throws Exception{}
	protected void doUninit() throws Exception{}
	abstract protected void doSub(String channel) throws Exception;
	abstract protected void doUnsub(String channel) throws Exception;

	/**
	 * 返回有效的连接实例<br>
	 * 确保返回的实例是有效可以连接的
	 * @throws JMSException 
	 */
	protected void init() throws Exception{
		// double check
		if(null == connection){
			synchronized (this) {
				if(null == connection){
					connection = pool.borrow();
					doInit();
				}
			}
		}
	}
	/**
	 * 释放当前的连接实例<br>
	 * 向{@link ActivemqPoolLazy}归还连接实例，并将{@link #connection}置为{@code null}
	 * @throws Exception 
	 */
	protected void uninit() throws Exception{
		// double check
		if(connection != null){
			synchronized (this) {
				if(connection != null){
					doUninit();
					// 如果pool已经关闭就不再执行release动作
					if(!pool.isClosed()){
						pool.release(connection);
					}
					connection = null;
				}
			}
		}
	}
	@Override
	public final String[] subscribe(String... channels) {
		synchronized (this) {
			channels = super.subscribe(channels);
			try{
				init();
				for(String name:channels){
					checkState(!Strings.isNullOrEmpty(name),"channel name is null or empty");
					doSub(name);
				}
			}catch(Throwable  e){
				e.printStackTrace();
			}
			return channels;
		}
	}
	@Override
	public final String[] unsubscribe(String... channels) {
		synchronized (this) {
			channels = super.unsubscribe(channels);
			if(!pool.isClosed()){
				for(String name : channels){
					try{
						doUnsub(checkNotNull(name,"channel name is null"));
					}catch (Throwable e) {
						if(!closed.get()){
							e.printStackTrace();
						}
					}
				}
			}
			return channels;
		}
	}
	
	@Override
	public final void close() throws Exception{
		if(closed.compareAndSet(Boolean.FALSE, Boolean.TRUE)){
			unsubscribe();
			uninit();
		}
	}
	@Override
	public String toString() {
		StringBuilder builder = new StringBuilder();
		builder.append(getClass().getSimpleName() + " [pool=");
		builder.append(pool);
		builder.append("]");
		return builder.toString();
	}
}
